home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.3)
-
- from sha import sha
- from bisect import bisect_right
-
- class Storage:
-
- def __init__(self, files, open, exists, getsize):
- self.ranges = []
- total = 0x0L
- so_far = 0x0L
- for file, length in files:
- if length != 0:
- self.ranges.append((total, total + length, file))
- total += length
- if exists(file):
- l = getsize(file)
- if l > length:
- l = length
-
- so_far += l
-
- exists(file)
- if not exists(file):
- open(file, 'wb').close()
- continue
-
- self.begins = [ i[0] for i in self.ranges ]
- self.total_length = total
- self.handles = { }
- self.whandles = { }
- self.tops = { }
- for file, length in files:
- if exists(file):
- l = getsize(file)
- self.tops[file] = l
- continue
- None if l != length else []
- self.handles[file] = open(file, 'wb+')
- self.whandles[file] = 1
-
-
-
- def was_preallocated(self, pos, length):
- for file, begin, end in self._intervals(pos, length):
- if self.tops.get(file, 0) < end:
- return False
- continue
-
- return True
-
-
- def set_readonly(self):
- for file in self.whandles.keys():
- old = self.handles[file]
- old.flush()
- old.close()
- self.handles[file] = open(file, 'rb')
-
-
-
- def get_total_length(self):
- return self.total_length
-
-
- def _intervals(self, pos, amount):
- r = []
- stop = pos + amount
- p = bisect_right(self.begins, pos) - 1
- while p < len(self.ranges) and self.ranges[p][0] < stop:
- (begin, end, file) = self.ranges[p]
- r.append((file, max(pos, begin) - begin, min(end, stop) - begin))
- p += 1
- return r
-
-
- def read(self, pos, amount):
- r = []
- for file, pos, end in self._intervals(pos, amount):
- h = self.handles[file]
- h.seek(pos)
- r.append(h.read(end - pos))
-
- return ''.join(r)
-
-
- def write(self, pos, s):
- total = 0
- for file, begin, end in self._intervals(pos, len(s)):
- if not self.whandles.has_key(file):
- self.handles[file].close()
- self.handles[file] = open(file, 'rb+')
- self.whandles[file] = 1
-
- h = self.handles[file]
- h.seek(begin)
- h.write(s[total:total + end - begin])
- total += end - begin
-
-
-
- def close(self):
- for h in self.handles.values():
- h.close()
-
-
-
-
- def lrange(a, b, c):
- r = []
- while a < b:
- r.append(a)
- a += c
- return r
-
- from fakeopen import FakeOpen
-
- def test_Storage_simple():
- f = FakeOpen()
- m = Storage([
- ('a', 5)], f.open, f.exists, f.getsize)
- if not f.files.keys() == [
- 'a']:
- raise AssertionError
- m.write(0, 'abc')
- if not m.read(0, 3) == 'abc':
- raise AssertionError
- m.write(2, 'abc')
- if not m.read(2, 3) == 'abc':
- raise AssertionError
- m.write(1, 'abc')
- if not m.read(0, 5) == 'aabcc':
- raise AssertionError
-
-
- def test_Storage_multiple():
- f = FakeOpen()
- m = Storage([
- ('a', 5),
- ('2', 4),
- ('c', 3)], f.open, f.exists, f.getsize)
- x = f.files.keys()
- x.sort()
- if not x == [
- '2',
- 'a',
- 'c']:
- raise AssertionError
- m.write(3, 'abc')
- if not m.read(3, 3) == 'abc':
- raise AssertionError
- m.write(5, 'ab')
- if not m.read(4, 3) == 'bab':
- raise AssertionError
- m.write(3, 'pqrstuvw')
- if not m.read(3, 8) == 'pqrstuvw':
- raise AssertionError
- m.write(3, 'abcdef')
- if not m.read(3, 7) == 'abcdefv':
- raise AssertionError
-
-
- def test_Storage_zero():
- f = FakeOpen()
- Storage([
- ('a', 0)], f.open, f.exists, f.getsize)
- if not f.files == {
- 'a': [] }:
- raise AssertionError
-
-
- def test_resume_zero():
- f = FakeOpen({
- 'a': '' })
- Storage([
- ('a', 0)], f.open, f.exists, f.getsize)
- if not f.files == {
- 'a': [] }:
- raise AssertionError
-
-
- def test_Storage_with_zero():
- f = FakeOpen()
- m = Storage([
- ('a', 3),
- ('b', 0),
- ('c', 3)], f.open, f.exists, f.getsize)
- m.write(2, 'abc')
- if not m.read(2, 3) == 'abc':
- raise AssertionError
- x = f.files.keys()
- x.sort()
- if not x == [
- 'a',
- 'b',
- 'c']:
- raise AssertionError
- if not len(f.files['a']) == 3:
- raise AssertionError
- if not len(f.files['b']) == 0:
- raise AssertionError
-
-
- def test_Storage_resume():
- f = FakeOpen({
- 'a': 'abc' })
- m = Storage([
- ('a', 4)], f.open, f.exists, f.getsize)
- if not f.files.keys() == [
- 'a']:
- raise AssertionError
- if not m.read(0, 3) == 'abc':
- raise AssertionError
-
-
- def test_Storage_mixed_resume():
- f = FakeOpen({
- 'b': 'abc' })
- m = Storage([
- ('a', 3),
- ('b', 4)], f.open, f.exists, f.getsize)
- x = f.files.keys()
- x.sort()
- if not x == [
- 'a',
- 'b']:
- raise AssertionError
- if not m.read(3, 3) == 'abc':
- raise AssertionError
-
-